{
  "metadata" : {
    "name" : "Twitter stream",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : [ "org.apache.spark %% spark-streaming-twitter % _" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "10DDDB91FC65435B99BD43F79F7E13CF"
    },
    "cell_type" : "markdown",
    "source" : "#Twitter example"
  }, {
    "metadata" : {
      "id" : "4A3F770419114D0F86EDEBABE4C8D370"
    },
    "cell_type" : "markdown",
    "source" : "## Set up"
  }, {
    "metadata" : {
      "id" : "1F42610DAF8E43118C3BB863B630455F"
    },
    "cell_type" : "markdown",
    "source" : "**Note:** we are using the `env` variables here. For this, adapt the following and execute before launching the server\n```\n  export TWITTER_CONSUMER_KEY=...\n  export TWITTER_CONSUMER_SECRET=\"...\n  export TWITTER_ACCESS_TOKEN=...\n  export TWITTER_ACCESS_TOKEN_SECRET=...\n```"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "82A740C58B0440758311834175A489F1"
    },
    "cell_type" : "code",
    "source" : "def $(s:String) = sys.env(s)\nSystem.setProperty(\"twitter4j.oauth.consumerKey\", $(\"TWITTER_CONSUMER_KEY\"))\nSystem.setProperty(\"twitter4j.oauth.consumerSecret\", $(\"TWITTER_CONSUMER_SECRET\"))\nSystem.setProperty(\"twitter4j.oauth.accessToken\", $(\"TWITTER_ACCESS_TOKEN\"))\nSystem.setProperty(\"twitter4j.oauth.accessTokenSecret\", $(\"TWITTER_ACCESS_TOKEN_SECRET\"))\n\"twitter settings done!\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "2AFCD26ED37D4C2F8BCE12AFCAF53CF8"
    },
    "cell_type" : "markdown",
    "source" : "## Spark streaming"
  }, {
    "metadata" : {
      "id" : "01F6AC421D3043018DB9B08946C86C9F"
    },
    "cell_type" : "markdown",
    "source" : "### Create context with batch 2s "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "4505F2ACCD98407AA4C71F1EB12A303B"
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.streaming.{Seconds, StreamingContext}\nimport org.apache.spark.SparkContext._\nimport org.apache.spark.streaming.twitter._\n\nval ssc = new StreamingContext(sparkContext, Seconds(2))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8EFC28E41EC143098F5864C1D73B1191"
    },
    "cell_type" : "markdown",
    "source" : "### Listen twitter stream "
  }, {
    "metadata" : {
      "id" : "4F9A3CF961A742718B48E596F1E84623"
    },
    "cell_type" : "markdown",
    "source" : "#### We're going to **filter** the tweets to only those containing the following words."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "D94D5B81B873417282F7935B0E454FFF"
    },
    "cell_type" : "code",
    "source" : "val filters = Array(\"spark\", \"scala\", \"music\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "9720607EB7C14CA985E8073C8E697743"
    },
    "cell_type" : "markdown",
    "source" : "#### Create the twitter listeners"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "8078A46C6A5441548438DBFEBAC7808A"
    },
    "cell_type" : "code",
    "source" : "val twitterStream = TwitterUtils.createStream(ssc, None, filters)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "FF7D592790DC476FB06312D34C28078D"
    },
    "cell_type" : "markdown",
    "source" : "#### Count by hashtag and sort  "
  }, {
    "metadata" : {
      "id" : "CE41954CB8CE433580CC2B5F653F2A18"
    },
    "cell_type" : "markdown",
    "source" : "##### The windows are `60s` long"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "D6BC245BD18040318003DB8BB7FA3C51"
    },
    "cell_type" : "code",
    "source" : "import StreamingContext._\nval hashTags = twitterStream.flatMap(status => status.getText.split(\" \").filter(_.startsWith(\"#\")))\n\nval topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))\n                          .map{case (topic, count) => (count, topic)}\n                          .transform(_.sortByKey(false))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "99A336070E1A4997B26F0E964FD5AEEB"
    },
    "cell_type" : "markdown",
    "source" : "##### Creating the text output to be updated by the stream of result "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "output_stream_collapsed" : true,
      "collapsed" : true,
      "id" : "119116542F8F49E284EE938F11481BD0"
    },
    "cell_type" : "code",
    "source" : "val result = ul(10)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "8AFBCCEC974949D28444D37CE1CB9C14"
    },
    "cell_type" : "markdown",
    "source" : "##### Let's show the 10 first hashtag every window "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "2AE08FE0C1CB47708BDBA089CE59E839"
    },
    "cell_type" : "code",
    "source" : "// Print popular hashtags\ntopCounts60.foreachRDD(rdd => {\n  val topList = rdd.take(10).toList\n  val r = topList.map{case (count, tag) => s\"$tag: $count\"}\n  result(r)\n})",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "CBA1D8623D99451D83A25A03D599EE98"
    },
    "cell_type" : "markdown",
    "source" : "### Show the Geolocations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "2A768ECCAE2148CE879D4E7A90B128B6"
    },
    "cell_type" : "code",
    "source" : "val geo = widgets.GeoPointsChart(Seq((0d,0d, \"init\")))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "output_stream_collapsed" : true,
      "collapsed" : true,
      "id" : "13D77A7B07024DC39E20BE968AFA9491"
    },
    "cell_type" : "code",
    "source" : "twitterStream .window(Seconds(60), Seconds(6))  \n              . filter{ s => \n                s.getGeoLocation() != null\n              }\n              .map(s => (s.getGeoLocation().getLatitude(), s.getGeoLocation().getLongitude(), s.getText()))\n              .foreachRDD{rdd => \n                geo.applyOn(rdd.take(100))\n              }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "748F1312C73A49F294C6F7E46DFA746D"
    },
    "cell_type" : "markdown",
    "source" : "###  Start listening twitter"
  }, {
    "metadata" : {
      "id" : "D2BC51D84F50429085A13E7AADDB80CE"
    },
    "cell_type" : "markdown",
    "source" : "This will listen the twitter stream, and the computation above will update the `resuilt` every `2s` using the last `60s` of values."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "2400F651DF1D44AB8FB8961591619612"
    },
    "cell_type" : "code",
    "source" : "ssc.start()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "568F13E3EC344676AAC11E3034008567"
    },
    "cell_type" : "markdown",
    "source" : "### Stop listening twitter "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "856AA6E0E0F24FE990EB5FBD6B638A33"
    },
    "cell_type" : "code",
    "source" : "// commented to all 'run all' :-D\n//ssc.stop()",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}